Kotlin Flow 背压和线程切换竟然如此相似 您所在的位置:网站首页 Golang 协程切换的时机 Kotlin Flow 背压和线程切换竟然如此相似

Kotlin Flow 背压和线程切换竟然如此相似

2023-05-08 05:58| 来源: 网络整理| 查看: 265

前言

协程系列文章:

一个小故事讲明白进程、线程、Kotlin 协程到底啥关系? 少年,你可知 Kotlin 协程最初的样子? 讲真,Kotlin 协程的挂起/恢复没那么神秘(故事篇) 讲真,Kotlin 协程的挂起/恢复没那么神秘(原理篇) Kotlin 协程调度切换线程是时候解开真相了 Kotlin 协程之线程池探索之旅(与Java线程池PK) Kotlin 协程之取消与异常处理探索之旅(上) Kotlin 协程之取消与异常处理探索之旅(下) 来,跟我一起撸Kotlin runBlocking/launch/join/async/delay 原理&使用 继续来,同我一起撸Kotlin Channel 深水区 Kotlin 协程 Select:看我如何多路复用 Kotlin Sequence 是时候派上用场了 Kotlin Flow啊,你将流向何方? Kotlin Flow 背压和线程切换竟然如此相似 Kotlin SharedFlow&StateFlow 热流到底有多热? 狂飙吧,Lifecycle与协程、Flow的化学反应 来吧!接受Kotlin 协程--线程池的7个灵魂拷问 当,Kotlin Flow与Channel相逢 这一次,让Kotlin Flow 操作符真正好用起来

上篇分析了Kotlin Flow原理,大部分操作符实现比较简单,相较而言背压和线程切换比较复杂,遗憾的是,纵观网上大部分文章,关于Flow背压和协程切换这块的原理说得比较少,语焉不详,鉴于此,本篇重点分析两者的原理及使用。 通过本篇文章,你将了解到:

什么是背压? 如何处理背压? Flow buffer的原理 Flow 线程切换的使用 Flow 线程切换的原理 1. 什么是背压?

先看自然界的水流:

image.png

为了充分利用水资源,人类建立了大坝,以大坝为分界点将水流分为上游和下游。

当上游的流速大于下游的流速,日积月累,最终导致大坝溢出,此种现象称为背压的出现

而对于Kotlin里的Flow,也有上游(生产者)、下游(消费者)的概念,如:

suspend fun testBuffer1() { var flow = flow { //生产者 (1..3).forEach { println("emit $it") emit(it) } } flow.collect { //消费者 println("collect:$it") } } 复制代码

通过collect操作符触发了流,从生产者生产数据(flow闭包),到消费者接收并处理数据(collect闭包),这就完成了流从上游到下游的一次流动过程。

2. 如何处理背压?

模拟一个生产者消费者速度不一致的场景:

suspend fun testBuffer3() { var flow = flow { (1..3).forEach { delay(1000) println("emit $it") emit(it) } } var time = measureTimeMillis { flow.collect { delay(2000) println("collect:$it") } } println("use time:${time} ms") } 复制代码

计算流从生产到消费的整个时间:

image.png

生产者的速度比消费者的速度快,而它俩都是在同一个线程里顺序执行的,生产者必须等待消费者消费完毕后才会进行下一次生产。 因此,整个流的耗时=生产者耗时(3 * 1000ms)+消费者耗时(3 * 2000ms)=9s。

显而易见,消费者影响了生产者的速度,这种情况下该怎么优化呢? 最简单的解决方案:

生产者和消费者分别在不同的线程执行

如:

suspend fun testBuffer4() { var flow = flow { (1..3).forEach { delay(1000) println("emit $it in thread:${Thread.currentThread()}") emit(it) } }.flowOn(Dispatchers.IO) var time = measureTimeMillis { flow.collect { delay(2000) println("collect:$it in thread:${Thread.currentThread()}") } } println("use time:${time} ms") } 复制代码

添加了flowOn()函数,它的存在使得它前面的代码在指定的线程里执行,如flow闭包了的代码都在IO线程执行,也就是生产者在IO线程执行。 而消费者在当前线程执行,因此两者无需相互等待,节省了总时间:

image.png

确实是减少了时间,提升了效率。但我们知道开启线程代价还是挺大的,既然都在协程里运行了,能否借助协程的特性:协程挂起不阻塞线程 来完成此事呢? 此时,Buffer出场了,先看看它是如何表演的:

suspend fun testBuffer5() { var flow = flow { (1..3).forEach { delay(1000) println("emit $it in thread:${Thread.currentThread()}") emit(it) } }.buffer(5) var time = measureTimeMillis { flow.collect { delay(2000) println("collect:$it in thread:${Thread.currentThread()}") } } println("use time:${time} ms") } 复制代码

这次没有使用flowOn,取而代之的是buffer。 运行结果如下:

image.png

可以看出,生产者消费者都是在同一线程执行,但总耗时却和不在同一线程运行时相差无几。 那么它是如何做到的呢?这就得从buffer的源码说起。

3. Flow buffer的原理 无buffer

先看看没有buffer时的耗时:

suspend fun testBuffer3() { var flow = flow { (1..3).forEach { delay(1000) println("emit $it") emit(it) } } var time = measureTimeMillis { flow.collect { delay(2000) println("collect:$it") } } println("use time:${time} ms") } 复制代码

image.png

从collect开始,依次执行flow闭包,通过emit调用到collect闭包,因为flow闭包里包含了几次emit,因此整个流程会有几次发射。 如上图,从步骤1到步骤8,因为是在同一个线程里,因此是串行执行的,整个流的耗时即为生产者到消费者(步骤1~步骤8)的耗时。

有buffer

在没看源码之前,我们先猜测一下它的流程:

image.png

每次emit都发送到buffer里,然后立刻回来继续发送,如此一来生产者没有被消费者的速度拖累。 而消费者会检测Buffer里是否有数据,有则取出来。

根据之前的经验我们知道:collect调用到emit最后到buffer是线性调用的,放入buffer后继续循环emit,那么问题来了:

是谁触发了collect闭包的调用呢?

接下来深入源码,探究答案。

buffer源码流程分析

创建Flow

public fun Flow.buffer(capacity: Int = Channel.BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow { var capacity = capacity//buffer容量 var onBufferOverflow = onBufferOverflow//buffer满之后的处理策略 if (capacity == Channel.CONFLATED) { capacity = 0 onBufferOverflow = BufferOverflow.DROP_OLDEST } // create a flow return when (this) { is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow) //走else 分支,构造ChannelFlowOperatorImpl else -> ChannelFlowOperatorImpl(this, capacity = capacity, onBufferOverflow = onBufferOverflow) } } 复制代码

buffer 返回Flow实例,其间涉及几个重要的类和函数:

image.png

调用collect 当调用Flow.collect时:

public suspend inline fun Flow.collect(crossinline action: suspend (value: T) -> Unit): Unit = collect(object : FlowCollector { override suspend fun emit(value: T) = action(value) }) 复制代码

构造了匿名内部类FlowCollector,并实现了emit方法,它的实现为collect的闭包。

调用ChannelFlowOperatorImpl.collect最终会调用ChannelFlow.collect:

override suspend fun collect(collector: FlowCollector): Unit = coroutineScope { collector.emitAll(produceImpl(this)) } public open fun produceImpl(scope: CoroutineScope): ReceiveChannel = scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun) 复制代码

produceImpl 创建了Channel,内部开启了协程,返回ReceiveChannel。

再来看emitAll函数:

private suspend fun FlowCollector.emitAllImpl(channel: ReceiveChannel, consume: Boolean) { ensureActive() var cause: Throwable? = null try { while (true) { //挂起等待Channel数据 val result = run { channel.receiveCatching() } if (result.isClosed) { //Channel关闭后才会退出循环 result.exceptionOrNull()?.let { throw it } break // returns normally when result.closeCause == null } //发送数据 emit(result.getOrThrow()) } } catch (e: Throwable) { cause = e throw e } finally { if (consume) channel.cancelConsumed(cause) } } 复制代码

Channel此时并没有数据,因此协程会挂起等待。

Channel发送 Channel什么时候有数据呢?当然是在调用了Channel.send()函数后。 前面提到过collect之后开启了协程:

public open fun produceImpl(scope: CoroutineScope): ReceiveChannel = scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun) internal val collectToFun: suspend (ProducerScope) -> Unit get() = { collectTo(it) } protected override suspend fun collectTo(scope: ProducerScope) = flowCollect(SendingCollector(scope)) 复制代码

此时传入的参数为:collectToFun,最后构造了:

public class SendingCollector( private val channel: SendChannel ) : FlowCollector { override suspend fun emit(value: T): Unit = channel.send(value) } 复制代码

当协程得到执行时,会调用collectToFun-->collectTo(it)-->flowCollect(SendingCollector(scope)),最终调用到:

#ChannelFlowOperatorImpl override suspend fun flowCollect(collector: FlowCollector) = flow.collect(collector) 复制代码

而该flow为最开始的flow,collector为SendingCollector。 flow.collect后会调用到flow的闭包,进而调用到emit函数:

private fun emit(uCont: Continuation, value: T): Any? { val currentContext = uCont.context currentContext.ensureActive() //... completion = uCont return emitFun(collector as FlowCollector, value, this as Continuation) } 复制代码

emitFun本质上会调用collector里的emit函数,而此时的collector即为SendingCollector,最后调用channel.send(value)

如此一来,Channel就将数据发送出去了,此时channel.receiveCatching()被唤醒,接下来执行emit(result.getOrThrow()),这函数最后会流转到最初始的collect的闭包里。 上面的分析即为生产者到消费者的流转过程,单看源码可能比较乱,看图解惑:

image.png

红色部分和绿色部分分别为不同的协程,它俩的关联点即是蓝色部分。

Flow buffer的本质上是利用了Channel进行数据的发送和接收

buffer为啥能提升效率

前面分析过无buffer时生产者消费者的流程图,作为对比,我们也将加入buffer后生产者消费者的流程图。

image.png

还是以相同的demo,阐述其流程:

生产者挂起1s,当1s结束后调用emit发射数据,此时数据放入buffer里,生产者调用delay继续挂起 此时消费者被唤醒,然后挂起 2s等待 第2s到来之时,生产者调用emit发送数据到buffer里,继续挂起 第2s到来之时,消费者结束挂起,消费数据,然后继续挂起2s 第3s到来之时,生产者继续生产数据,而后生产者退出生产 第5s到来之时,消费者挂起结束,消费数据,然后继续挂起2s 第7s到来之时,消费者挂起结束,消费结束,此时因为channel里已经没有数据了,退出循环,最终消费者退出

由此可见,总共花费了7s。

image.png

ps:协程调度时机不同,打印顺序可能略有差异,但总体耗时不变。

至此,我们找到了buffer能够提高效率的原因:

生产者、消费者运行在不同的协程,挂起操作不阻塞对方

抛出一个比较有意思的问题:以下代码加buffer之后效率会有提升吗?

suspend fun testBuffer6() { var flow = flow { (1..3).forEach { println("emit $it") emit(it) } } var time = measureTimeMillis { flow.collect { delay(2000) println("collect:$it") } } println("use time:${time} ms") } 复制代码

在未实验之前,如果你已经有答案,恭喜你已经弄懂了buffer的本质。

4. Flow 线程切换的使用 suspend fun testBuffer4() { var flow = flow { (1..3).forEach { delay(1000) println("emit $it in thread:${Thread.currentThread()}") emit(it) } }.flowOn(Dispatchers.IO) var time = measureTimeMillis { flow.collect { delay(2000) println("collect:$it in thread:${Thread.currentThread()}") } } println("use time:${time} ms") } 复制代码

flowOn(Dispatchers.IO)表示其之前的操作符(函数)都在IO线程执行,如这里的意思是flow闭包里的代码在IO线程执行。 而其之后的操作符(函数)在当前的线程执行。 通常用在子线程里获取网络数据(flow闭包),然后再collect闭包里(主线程)更新UI。

5. Flow 线程切换的原理 public fun Flow.flowOn(context: CoroutineContext): Flow { checkFlowContext(context) return when { context == EmptyCoroutineContext -> this this is FusibleFlow -> fuse(context = context) else -> ChannelFlowOperatorImpl(this, context = context) } } 复制代码

看到这你可能已经有答案了:这不就和buffer一样的方式吗? 但仔细看,此处多了个上下文:CoroutineContext。 CoroutineContext的作用就是用来决定协程运行在哪个线程。

前面分析的buffer时,我们的协程的作用域是runBlocking,即使生产者、消费者在不同的协程,但是它们始终在同一个线程里执行。 而使用了flowOn指定线程,此时生产者、消费者在不同的线程运行协程。 因此,只要弄懂了buffer原理,flowOn原理自然而然就懂了。

image.png

以上为Flow背压和线程切换的全部内容,下篇将分析Flow的热流。 本文基于Kotlin 1.5.3,文中完整Demo请点击

您若喜欢,请点赞、关注、收藏,您的鼓励是我前进的动力 持续更新中,和我一起步步为营系统、深入学习Android/Kotlin

1、Android各种Context的前世今生 2、Android DecorView 必知必会 3、Window/WindowManager 不可不知之事 4、View Measure/Layout/Draw 真明白了 5、Android事件分发全套服务 6、Android invalidate/postInvalidate/requestLayout 彻底厘清 7、Android Window 如何确定大小/onMeasure()多次执行原因 8、Android事件驱动Handler-Message-Looper解析 9、Android 键盘一招搞定 10、Android 各种坐标彻底明了 11、Android Activity/Window/View 的background 12、Android Activity创建到View的显示过 13、Android IPC 系列 14、Android 存储系列 15、Java 并发系列不再疑惑 16、Java 线程池系列 17、Android Jetpack 前置基础系列 18、Android Jetpack 易学易懂系列 19、Kotlin 轻松入门系列 20、Kotlin 协程系列全面解读



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有